跳过主内容
版本: 5.0

RocketMQ Connect 实践 4

SFTP 服务器 (文件数据) -> RocketMQ Connect -> SFTP 服务器 (文件)

准备工作

启动 RocketMQ

  1. Linux/Unix/Mac
  2. 64位 JDK 1.8+;
  3. Maven 3.2.x+;
  4. 启动 RocketMQ。可以使用 RocketMQ 4.xRocketMQ 5.x 5.x 版本;
  5. 使用工具测试 RocketMQ 消息发送和接收。

这里,使用环境变量 NAMESRV_ADDR 告知工具客户端 RocketMQ 的 NameServer 地址为 localhost:9876。

#$ cd distribution/target/rocketmq-4.9.7/rocketmq-4.9.7
$ cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4

$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...

$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...

注意: RocketMQ 具有自动创建 Topic 和 Group 的功能。当发送或订阅消息时,如果对应的 Topic 或 Group 不存在,RocketMQ 将自动创建它们。因此,无需提前创建 Topic 和 Group。

构建连接器运行时

git clone https://github.com/apache/rocketmq-connect.git

cd rocketmq-connect

export RMQ_CONNECT_HOME=`pwd`

mvn -Prelease-connect -Dmaven.test.skip=true clean install -U

构建 SFTP 连接器插件

cd $RMQ_CONNECT_HOME/connectors/rocketmq-connect-sftp/

mvn clean package -Dmaven.test.skip=true

将 SFTP RocketMQ 连接器编译后的 jar 包放入 Plugin 目录,以便运行时加载。

mkdir -p /Users/YourUsername/rocketmqconnect/connector-plugins
cp target/rocketmq-connect-sftp-0.0.1-SNAPSHOT-jar-with-dependencies.jar /Users/YourUsername/rocketmqconnect/connector-plugins

以独立模式运行连接器 Worker

修改 connect-standalone.conf 文件,配置 RocketMQ 连接地址及其他信息。

cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT

vim conf/connect-standalone.conf

示例配置信息如下

workerId=standalone-worker
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot

## Http port for user to access REST API
httpPort=8082

# Rocketmq namesrvAddr
namesrvAddr=localhost:9876

# RocketMQ acl
aclEnable=false
#accessKey=rocketmq
#secretKey=12345678

clusterName="DefaultCluster"

# Plugin path for loading Source/Sink Connectors
pluginPaths=/Users/YourUsername/rocketmqconnect/connector-plugins

在独立模式下,RocketMQ Connect 将同步检查点信息持久化存储在 storePathRootDir 指定的本地文件目录中。

storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot

如果需要重置同步检查点,则需要删除持久化的检查点信息文件。

rm -rf /Users/YourUsername/rocketmqconnect/storeRoot/*

以独立模式启动连接器 Worker

sh bin/connect-standalone.sh -c conf/connect-standalone.conf &

搭建 SFTP 服务器

SFTP(SSH 文件传输协议)是一种用于计算机之间安全文件传输的文件传输协议。SFTP 基于 SSH(安全外壳协议)构建,并利用加密和身份验证。

我们将使用 macOS 内置的 SFTP 服务(通过启用“远程登录”访问)。有关详细说明,请参阅 允许远程电脑访问您的 Mac 文档。

创建源测试文件

创建名为 source.txt 的测试文件并向其中写入一些测试数据

mkdir -p /Users/YourUsername/rocketmqconnect/sftp-test/

cd /Users/YourUsername/rocketmqconnect/sftp-test/

touch source.txt

echo 'John Doe|100000202211290001|20221129001|30000.00|2022-11-28|03:00:00|7.00
Jane Smith|100000202211290002|20221129002|40000.00|2022-11-28|04:00:00|9.00
Bob Johnson|100000202211290003|20221129003|50000.00|2022-11-28|05:00:00|12.00' >> source.txt

登录 SFTP 服务,验证是否可以正常访问。输入以下命令,然后输入您的密码

# sftp -P port YourUsername@hostname
sftp -P 22 YourUsername@127.0.0.1

注意: 由于这是您本地 MAC OS 提供的 SFTP 服务,因此地址是 127.0.0.1 端口是默认的 22。

sftp> cd /Users/YourUsername/rocketmqconnect/sftp-test/
sftp> ls source.txt
sftp> bye

启动连接器

启动 SFTP Source 连接器

运行以下命令启动 SFTP source 连接器。此连接器将连接到 SFTP 服务以读取 source.txt 文件。对于文件中的每一行文本,连接器将解析并打包内容到通用 ConnectRecord 对象中,然后将其发送到 RocketMQ topic 以供 sink 连接器消费。

curl -X POST --location "http://localhost:8082/connectors/SftpSourceConnector" --http1.1 \
-H "Host: localhost:8082" \
-H "Content-Type: application/json" \
-d '{
"connector.class": "org.apache.rocketmq.connect.http.sink.SftpSourceConnector",
"host": "127.0.0.1",
"port": 22,
"username": "YourUsername",
"password": "yourPassword",
"filePath": "/Users/YourUsername/rocketmqconnect/sftp-test/source.txt",
"connect.topicname": "sftpTopic",
"fieldSeparator": "|",
"fieldSchema": "username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit"
}'

如果 curl 请求返回状态:200,则表示连接器创建成功。示例响应如下

{"status":200,"body":{"connector.class":"...

要确认文件源连接器已成功启动,请运行以下命令

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

启动连接器 SftpSourceConnector 并设置目标状态 STARTED 成功!!

启动 SFTP Sink 连接器

运行以下命令启动 SFTP sink 连接器。此连接器将订阅 RocketMQ topic 以消费消息,并将每条消息转换为单行文本,然后使用 SFTP 协议写入目标文件 sink.txt

curl -X POST --location "http://localhost:8082/connectors/SftpSinkConnector" --http1.1 \
-H "Host: localhost:8082" \
-H "Content-Type: application/json" \
-d '{
"connector.class": "org.apache.rocketmq.connect.http.sink.SftpSinkConnector",
"host": "127.0.0.1",
"port": 22,
"username": "YourUsername",
"password": "yourPassword",
"filePath": "/Users/YourUsername/rocketmqconnect/sftp-test/sink.txt",
"connect.topicnames": "sftpTopic",
"fieldSeparator": "|",
"fieldSchema": "username|idCardNo|orderNo|orderAmount|trxDate|trxTime|profit"
}'

如果 curl 请求返回状态:200,则表示连接器创建成功。示例响应如下

{"status":200,"body":{"connector.class":"...

检查日志以确认 SFTP sink 连接器成功启动

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

启动连接器 SftpSinkConnector 并设置目标状态 STARTED 成功!!

运行以下命令确认数据已写入目标文件

cat /Users/YourUsername/rocketmqconnect/sftp-test/sink.txt

如果 sink.txt 文件已生成且其内容与 source.txt 文件匹配,则整个过程正常工作。

source.txt 文件写入更多测试数据以继续测试

cd /Users/YourUsername/rocketmqconnect/sftp-test/

echo 'John Doe|100000202211290001|20221129001|30000.00|2022-11-28|03:00:00|7.00
Jane Smith|100000202211290002|20221129002|40000.00|2022-11-28|04:00:00|9.00
Bob Johnson|100000202211290003|20221129003|50000.00|2022-11-28|05:00:00|12.00' >> source.txt

# Wait a few seconds to give the connector time to replicate data to the sink file.
sleep 10

cat /Users/YourUsername/rocketmqconnect/sftp-test/sink.txt

注意: 文件内容的顺序可能有所不同,因为 rocketmq-connect-sftp 在向 RocketMQ topic 发送和接收消息时使用 普通消息。这与 顺序消息 不同,消费 普通消息 不保证顺序。